-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add milvus vector db integration #419
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking time to add a new vector db integration and making corrections to the mongo file. Please have a look at the requested changes.
@AbhishekRP2002 Please add an example config in the compose.env and comment so anyone who wants to use Milvus can refer to it. |
@AbhishekRP2002 PR looks good to me. Regarding your question about testing
We do not have a unit testing setup at the moment. Can you please post a few screenshots/videos showing that the basic operations work? If all looks good, we can go ahead and merge the PR. |
yes yes, I wrote a simple test.py script and encountered some bugs, I am fixing them and re-checking, will push the final changes in sometime. |
Thanks for the detailed explanation @AbhishekRP2002. Please let me know once the PR is ready with the updated test.py and we can go ahead and merge. Also, please resolve the review comments. |
Resolved the review comments. Updated test.py file as in ? I think I cannot add it in the codebase right ? |
LGTM 🚀
I think its best if we resolve testing as a separate setup later on. Looks good for now. |
@AbhishekRP2002 Please resolve the precommit errors. To fix this issue, you can run something like |
Sure. Will do it. |
Head branch was pushed to by a user without write access
hi @mnvsk97 i have fixed the pre-commit errors. |
Merged. Thanks for the contribution. Happy new year ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, this is Jinhong from Milvus, based on the existing milvus integration for cognita, I have added a few more functionalities! Feel free to let me know what you think!
metric_type=self.metric_type, # https://milvus.io/docs/metric.md#Metric-Types : check for other supported metrics | ||
schema=schema, | ||
auto_id=True, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, this is Jinhong from Milvus. We have transitioned to the new MilvusClient interface and are actively working to phase out the use of the older ORM interface. In this context, your current approach to adding fields to a collection may be updated to the following:
schema = self.milvus_client.create_schema(
auto_id=False, enable_dynamic_field=True
)
schema.add_field(
field_name="id",
datatype=DataType.INT64,
is_primary=True,
)
schema.add_field(
field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=vector_size
)
schema.add_field(
field_name="text",
datatype=DataType.VARCHAR,
max_length=65535,
)
schema.add_field(
field_name="metadata",
datatype=DataType.JSON,
)
index_params = self.milvus_client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="FLAT",
metric_type=self.metric_type,
)
self.milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
)
collection_name=collection_name, filter=delete_expr | ||
) | ||
|
||
logger.debug(f"[Milvus] Deleted {len(data_point_vectors)} data point vectors") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding the following 3 functions: list_documents_in_collection
, delete_documents
, and list_document_vector_points
Here is my implementation based on your code:
def list_documents_in_collection(
self, collection_name: str, base_document_id: str = None
) -> List[str]:
"""
List all documents in a collection.
Args:
collection_name (str): The name of the collection.
base_document_id (str, optional): Base document ID for filtering. Defaults to None.
Returns:
List[str]: List of document IDs.
"""
logger.debug(
f"[Milvus] Listing all documents with base document ID {base_document_id} for collection {collection_name}"
)
stop = False
offset = 0
document_ids_set = set()
while not stop:
filter_expr = (
f'metadata["{DATA_POINT_FQN_METADATA_KEY}"] == "{base_document_id}"'
if base_document_id
else ""
)
search_result = self.milvus_client.query(
collection_name=collection_name,
filter=filter_expr,
output_fields=["metadata"],
limit=BATCH_SIZE,
offset=offset,
)
if not search_result:
stop = True
break
for doc in search_result:
metadata = doc.get("metadata", {})
if metadata.get(DATA_POINT_FQN_METADATA_KEY):
document_ids_set.add(metadata.get(DATA_POINT_FQN_METADATA_KEY))
if len(document_ids_set) > MAX_SCROLL_LIMIT:
stop = True
break
if len(search_result) < BATCH_SIZE:
stop = True
else:
offset += BATCH_SIZE
logger.debug(
f"[Milvus] Found {len(document_ids_set)} documents with base document ID {base_document_id} in collection {collection_name}"
)
return list(document_ids_set)
def delete_documents(self, collection_name: str, document_ids: List[str]):
"""
Delete documents from a collection based on document IDs.
Args:
collection_name (str): The name of the collection.
document_ids (List[str]): List of document IDs to delete.
"""
logger.debug(
f"[Milvus] Deleting {len(document_ids)} documents from collection {collection_name}"
)
if not document_ids:
logger.warning("[Milvus] No document IDs provided for deletion.")
return
try:
for i in range(0, len(document_ids), BATCH_SIZE):
document_ids_to_delete = document_ids[i : i + BATCH_SIZE]
delete_expr = " or ".join(
[
f'metadata["{DATA_POINT_FQN_METADATA_KEY}"] == "{doc_id}"'
for doc_id in document_ids_to_delete
]
)
self.milvus_client.delete(
collection_name=collection_name, filter=delete_expr
)
logger.debug(
f"[Milvus] Deleted {len(document_ids)} documents from collection {collection_name}"
)
except Exception as exp:
logger.error(f"[Milvus] Error deleting documents: {exp}")
def list_document_vector_points(
self, collection_name: str
) -> List[DataPointVector]:
"""
List all document vector points in a collection.
Args:
collection_name (str): The name of the collection.
Returns:
List[DataPointVector]: List of vector points with metadata.
"""
logger.debug(
f"[Milvus] Listing all document vector points for collection {collection_name}"
)
stop = False
offset = 0
document_vector_points: List[DataPointVector] = []
while not stop:
search_result = self.milvus_client.query(
collection_name=collection_name,
output_fields=["id", "metadata"],
limit=BATCH_SIZE,
offset=offset,
)
if not search_result:
stop = True
break
for doc in search_result:
metadata = doc.get("metadata", {})
if metadata.get(DATA_POINT_FQN_METADATA_KEY) and metadata.get(
DATA_POINT_HASH_METADATA_KEY
):
document_vector_points.append(
DataPointVector(
data_point_vector_id=str(doc["id"]),
data_point_fqn=metadata.get(DATA_POINT_FQN_METADATA_KEY),
data_point_hash=metadata.get(DATA_POINT_HASH_METADATA_KEY),
)
)
if len(document_vector_points) > MAX_SCROLL_LIMIT:
stop = True
break
if len(search_result) < BATCH_SIZE:
stop = True
else:
offset += BATCH_SIZE
logger.debug(
f"[Milvus] Listed {len(document_vector_points)} document vector points for collection {collection_name}"
)
return document_vector_points
Customized setup
, I am thinking.